package com.atguigu.one.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.one.bean.UserLoginBean;
import com.atguigu.one.utils.DateFormatUtil;
import com.atguigu.one.utils.MyClickHouseUtil;
import com.atguigu.one.utils.MyKafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class DwsUserUserLoginWindow {
    public static void main(String[] args) throws Exception {
        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //设置状态后端
        //        env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
        //        env.getCheckpointConfig().setCheckpointTimeout(60*1000L);
        //        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
        //        env.getCheckpointConfig().enableExternalizedCheckpoints(
        //                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        //        );
        //        env.setRestartStrategy(
        //                RestartStrategies.failureRateRestart(10, Time.of(1L, TimeUnit.DAYS),Time.of(3L,TimeUnit.DAYS))
        //        );
        //        env.setStateBackend(new HashMapStateBackend());
        //        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/gmall/ck");
        //        System.setProperty("HADOOP_USER_NAME","atguigu");

        //TODO 2.读取Kafka中DWD层页面日志主题数据 创建流
        String pageTopic = "dwd_traffic_page_log";
        String groupId = "user_login";
        DataStreamSource<String> pageLogDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(pageTopic, groupId));

        //TODO 3.转换为JSON对象 并过滤数据
        SingleOutputStreamOperator<JSONObject> jsonObjectDS =
                pageLogDS.flatMap(new FlatMapFunction<String, JSONObject>() {
                    @Override
                    public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                        JSONObject jsonObject = JSON.parseObject(value);

                        String uid = jsonObject.getJSONObject("common").getString("uid");
                        String lastPageId = jsonObject.getJSONObject("page").getString("last_page_id");

                        if (uid != null && (lastPageId == null || "login".equals(lastPageId))) {
                            out.collect(jsonObject);
                        }
                    }
                });

        //TODO 4.提取时间戳生成WaterMark
        SingleOutputStreamOperator<JSONObject> jsonObjectWithWmDS = jsonObjectDS.assignTimestampsAndWatermarks(
                WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
                            @Override
                            public long extractTimestamp(JSONObject element, long recordTimestamp) {
                                return element.getLong("ts");
                            }
                        })
        );

        //TODO 5.按照uid分组
        KeyedStream<JSONObject, String> keyedStream =
                jsonObjectWithWmDS.keyBy(json -> json.getJSONObject("common").getString("uid"));

        //TODO 6.使用状态编程进行去重 & 转换为JavaBean
        SingleOutputStreamOperator<UserLoginBean> userLoginDS =
                keyedStream.flatMap(new RichFlatMapFunction<JSONObject, UserLoginBean>() {
                    private ValueState<String> lastLoginState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        lastLoginState = getRuntimeContext().getState(
                                new ValueStateDescriptor<String>("last_login",String.class)
                        );
                    }

                    @Override
                    public void flatMap(JSONObject value, Collector<UserLoginBean> out) throws Exception {
                        //获取状态中的日期 & 当前日期
                        String lastDt = lastLoginState.value();
                        Long curTs = value.getLong("ts");
                        String curDt = DateFormatUtil.toDate(curTs);

                        // 回流用户数
                        long backCt = 0L;
                        // 独立用户数
                        long uuCt = 0L;

                        //去重
                        if (lastDt == null) {
                            uuCt = 1L;
                            lastLoginState.update(curDt);
                        } else if (!lastDt.equals(curDt)) {
                            uuCt = 1L;
                            lastLoginState.update(curDt);
                            if ((curTs - DateFormatUtil.toTs(lastDt, false)) / (24 * 60 * 60 * 1000L) > 7) {
                                backCt = 1L;
                            }
                        }

                        //输出数据
                        if (uuCt == 1L) {
                            out.collect(new UserLoginBean("", "", backCt, uuCt, null));
                        }
                    }
                });

        //TODO 7.开窗,聚合
        SingleOutputStreamOperator<UserLoginBean> resultDS =
                userLoginDS.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
                        .reduce(new ReduceFunction<UserLoginBean>() {
                                    @Override
                                    public UserLoginBean reduce(UserLoginBean value1, UserLoginBean value2) throws Exception {
                                        value1.setBackCt(value1.getBackCt() + value2.getBackCt());
                                        value1.setUuCt(value1.getUuCt() + value2.getUuCt());
                                        return value1;
                                    }
                                },
                                new AllWindowFunction<UserLoginBean, UserLoginBean, TimeWindow>() {
                                    @Override
                                    public void apply(TimeWindow window, Iterable<UserLoginBean> values,
                                                      Collector<UserLoginBean> out) throws Exception {
                                        //获取数据
                                        UserLoginBean value = values.iterator().next();

                                        //补充信息
                                        value.setTs(System.currentTimeMillis());
                                        value.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
                                        value.setStt(DateFormatUtil.toYmdHms(window.getStart()));

                                        //输出数据
                                        out.collect(value);
                                    }
                                });
        resultDS.print("result>>>>>");

        //TODO 8.将数据写出到ClickHouse
        resultDS.addSink(MyClickHouseUtil.getSinkFunction("insert into dws_user_user_login_window values(?,?,?,?,?)"));

        //TODO 9.启动任务
        env.execute();
    }
}